Fixes #23646: Fix memory leak in scan_dags_job_background by adding singleton guard#27057
Conversation
…d by adding singleton guard scan_dags_job_background() spawns a new multiprocessing.Process per deploy call. Each process imports the full Airflow scheduler stack (~120Mi) and is never join()ed, so zombie processes accumulate and memory is never released. Fix: track the running process with a threading.Lock, join() the previous process before starting a new one, skip if a scan is already in progress, and set daemon=True so zombies are cleaned up on parent exit.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
There was a problem hiding this comment.
Pull request overview
Fixes unbounded process/memory growth in the Airflow managed APIs by preventing scan_dags_job_background() from spawning a new scheduler-scan multiprocessing.Process on every deploy call.
Changes:
- Adds a module-level lock and “current scan” process reference to guard concurrent invocations.
- Joins the previous scan process (when finished) before starting a new one, and skips starting a new scan if one is already running.
- Runs the scan process as a daemon and updates the function docstring to reflect the approach.
| """ | ||
| process = ScanDagsTask() | ||
| process.start() | ||
| global _current_scan # noqa: PLW0603 |
There was a problem hiding this comment.
# noqa: PLW0603 won’t silence pylint (this package uses # pylint: disable=... in multiple places, e.g. api/routes/health.py:35-37). If pylint is part of CI for this module, it may still flag global _current_scan; consider using the equivalent # pylint: disable=global-statement (or project-standard suppression) instead of noqa.
| global _current_scan # noqa: PLW0603 | |
| global _current_scan # pylint: disable=global-statement |
…dd tests - Remove daemon=True: ScanDagsTask spawns child processes (Airflow scheduler internals), which is forbidden for daemon processes - Add _rescan_requested flag: ensures deploys during an active scan queue a follow-up scan instead of silently dropping - Clarify docstring: guard is per-worker, not cross-Gunicorn - Replace noqa with pylint disable to match project conventions - Add unit tests covering singleton guard behavior
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
After joining a finished scan, check _rescan_requested before starting a new process. If no rescan was queued (flag is False), return early instead of unconditionally spawning a new scan. This ensures deploys that arrive during an active scan actually trigger a follow-up scan. Updated tests to cover both paths: rescan-requested starts new scan, no-rescan-requested returns without spawning.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
…test - Extract _start_scan() and _reap_scan() helpers from main function - Reaper thread join()s the scan process and automatically starts a follow-up scan if _rescan_requested was set, ensuring deploys during an active scan are never lost — even without another deploy call - Simplify scan_dags_job_background() to just guard + delegate - Strengthen test_no_daemon_flag_on_process: assert process.daemon stays False after construction (catches post-init daemon=True) - Add test_reaper_starts_follow_up_when_rescan_requested - Add test_reaper_clears_current_scan_without_follow_up
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
A stale reaper thread (for process A) must not trigger a rescan if another scan (process B) has already replaced it. Move the _rescan_requested check inside the 'if _current_scan is process' block so only the reaper for the current scan can start a follow-up. Add test_stale_reaper_does_not_spawn_duplicate to cover the scenario.
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
Copilot flagged that _reap_scan() calling _start_scan() forks a new multiprocessing.Process from a non-main thread. On Linux with the default 'fork' start-method, this can deadlock because only the calling thread is replicated while locks held by other threads remain permanently locked in the child. Fix: the reaper thread now only join()s the process and clears module state. The _rescan_requested machinery is removed — newly deployed DAGs are discovered by the next deploy-triggered scan or by Airflow's periodic scheduler.
| Runs in a daemon thread. Only joins the process and clears module | ||
| state — never forks a new process, because forking from a non-main | ||
| thread with the default ``fork`` start-method can deadlock. | ||
| """ |
There was a problem hiding this comment.
The PR description states the reaper can start a follow-up scan when a deploy arrives mid-scan, but _reap_scan explicitly documents and enforces the opposite (“never forks a new process”). Please align the intended behavior across PR description, code comments, and tests (either remove the deferred-rescan claim from the description or adjust the implementation/tests accordingly).
| def test_reaper_never_forks(): | ||
| """Reaper thread must never start a new process (fork from non-main thread).""" | ||
| _reset_module_state() | ||
|
|
||
| finished_process = MagicMock() | ||
| utils_module._current_scan = finished_process | ||
|
|
||
| with patch.object(utils_module, "ScanDagsTask") as mock_cls: | ||
| utils_module._reap_scan(finished_process) | ||
|
|
||
| mock_cls.assert_not_called() | ||
|
|
||
|
|
There was a problem hiding this comment.
Tests currently assert that _reap_scan must never start a new ScanDagsTask (see docstring in this test), which conflicts with the PR description’s “deferred rescan” behavior. If the intended fix includes a queued follow-up scan, adjust the tests to cover that behavior; otherwise, update the PR description to match the implemented skip semantics.
Code Review ✅ Approved 6 resolved / 6 findingsFixes memory leak in scan_dags_job_background by adding singleton guard, addressing six concurrency and process management issues including daemon thread crashes, silent scan skips, and stale reaper threads. No remaining issues found. ✅ 6 resolved✅ Bug: daemon=True will crash ScanDagsTask when it spawns child processes
✅ Edge Case: Silently skipping scan may lose deploy-triggered DAG refreshes
✅ Bug: _rescan_requested flag is set but never read to trigger a rescan
✅ Bug: Deploy after finished scan silently skips DAG scanning
✅ Edge Case: Deferred rescan has no automatic trigger mechanism
...and 1 more resolved from earlier reviews OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
Describe your changes:
Fixes #23646
Each time an ingestion pipeline is deployed via the OpenMetadata UI,
scan_dags_job_background()spawns a newmultiprocessing.Process(ScanDagsTask). Each process:SchedulerJobwithheartrate=0, which the main scheduler marks as "failed"join()ed by the parent — so it becomes a zombie process whose memory is never releasedAfter N deploys, the webserver pod accumulates N × ~120Mi of leaked memory and N orphaned "failed" SchedulerJob entries in the Airflow database.
Fix: Add a per-worker singleton guard with a reaper thread to
scan_dags_job_background():threading.Lock+ module-level_current_scanreference prevents spawning multiple concurrent scan processes from the same Python worker_reap_scan) thatjoin()s the process when it finishes, releasing resources and preventing zombies_rescan_requestedflag is set; the reaper automatically starts one follow-up scan after the current one completes, ensuring newly deployed DAGs are always discovered_current_scan is processidentity guard, so a stale reaper (whose process was already replaced) cannot spawn duplicatesdaemon=Trueon ScanDagsTask — Airflow's scheduler internals fork child processes to parse DAGs, which Python forbids from daemon processes (AssertionError: daemonic processes are not allowed to have children). The reaper thread is daemonized instead.Before (broken):
After (fixed):
2 files changed:
utils.py(+45, -4),test_scan_dags_singleton.py(new, 7 test cases).Type of change:
Checklist:
Fixes #23646: Fix memory leak in scan_dags_job_background by adding singleton guardSummary by Gitar
_rescan_requestedflag and the automatic follow-up scan, opting for a skip-if-busy approach as shown in the diff._reap_scanto join the process and clear_current_scanwithout re-triggering logic.This will update automatically on new commits.